/*******************************************************************************
 * Copyright (c) 2010, 2013 IBM Corp.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * and Eclipse Distribution License v1.0 which accompany this distribution. 
 *
 * The Eclipse Public License is available at 
 *    http://www.eclipse.org/legal/epl-v10.html
 * and the Eclipse Distribution License is available at 
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Ian Craggs - initial API and implementation and/or initial documentation
 *******************************************************************************/

package org.eclipse.paho.mqttsn.udpclient;


import java.net.InetAddress;
import java.net.UnknownHostException;
//import java.util.Hashtable;








import org.eclipse.paho.mqttsn.udpclient.exceptions.MqttsException;
import org.eclipse.paho.mqttsn.udpclient.messages.Message;
import org.eclipse.paho.mqttsn.udpclient.messages.control.ControlMessage;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsConnack;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsConnect;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsDisconnect;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsMessage;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPingReq;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPingResp;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPubComp;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPubRec;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPubRel;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPuback;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsPublish;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsRegack;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsRegister;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsSuback;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsSubscribe;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsUnsuback;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsUnsubscribe;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillMsg;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillMsgReq;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillMsgResp;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillTopic;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillTopicReq;
import org.eclipse.paho.mqttsn.udpclient.messages.mqttsn.MqttsWillTopicResp;
import org.eclipse.paho.mqttsn.udpclient.timer.TimerService;
import org.eclipse.paho.mqttsn.udpclient.udp.UDPInterface;
import org.eclipse.paho.mqttsn.udpclient.utils.ClientLogger;
import org.eclipse.paho.mqttsn.udpclient.utils.ClientParameters;
import org.eclipse.paho.mqttsn.udpclient.utils.MsgQueue;
import org.eclipse.paho.mqttsn.udpclient.utils.Utils;


public class MqttsClient implements Runnable{

	public static final String version="140217";

	private static final int MQTTS_BACKUP_MESSAGE = 0x00;
	private static final int MQTTS_BACKUP_SEND_MESSAGE = 0x01;

	private MqttsMessage message = null;

	private int msgId;
	private ClientState clState; 
	private volatile boolean running;

	private boolean lostGw = false;
	private MqttsCallback callback = null;
	private UDPInterface udpInterface = null;
	private MsgQueue queue = null;
	private Thread readThread = null;
	private TimerService timer = null;
	private ClientParameters clientParms = null;

	private String clientid;
	private boolean will;
	private boolean cleanstart;
	private int keepalive;
	private String willtopic;
	private int willQoS;
	private String willmsg;
	private boolean willretain;

	private boolean autoReconnect = false;

	int ackMissedCounter = 0;


	public MqttsClient(String gatewayAddress, int gatewayPort, int maxMqttsMsgLength, int minMqttsMsgLength, 
			int maxRetries, int ackWaitingTime, boolean autoReconnect){
		
		InetAddress adr = null;
		try {
			adr = InetAddress.getByName(gatewayAddress);
			clientParms = new ClientParameters();
			clientParms.setGatewayAddress(adr);
			clientParms.setGatewayPort(gatewayPort);
			
			clientParms.setMaxMqttsLength(maxMqttsMsgLength);
			clientParms.setMinMqttsLength(minMqttsMsgLength);
			clientParms.setMaxRetries(maxRetries);
			clientParms.setWaitingTime(ackWaitingTime);

			this.clState = ClientState.NOT_ACTIVE;
			this.msgId = 1;
			this.autoReconnect= autoReconnect;

			queue = new MsgQueue();
			timer = new TimerService(queue);

			udpInterface = new UDPInterface();
			udpInterface.initialize(queue, clientParms);	

			//create thread for reading
			this.readThread = new Thread (this, "MqttsClient");
			this.running = true;
			this.readThread.start();
			
			String s = null;
			if (UDPInterface.ENCAPS) s=" with "; else s=" without ";
			ClientLogger.log(ClientLogger.INFO, "MQTT-S client version "+
					version + s + ("encapsulation started ..."));
			System.out.println("MQTT-S client version "+ version + s + ("encapsulation started ..."));

		} catch (MqttsException e) {
			ClientLogger.log(ClientLogger.ERROR, ""+e); 
			e.printStackTrace();
		} catch (UnknownHostException e) {
			ClientLogger.log(ClientLogger.ERROR, ""+e); 
			e.printStackTrace();
		}
	}

	public MqttsClient(String gatewayAddress, int gatewayPort,
			int maxMqttsMsgLength, int minMqttsMsgLength, 
			int maxRetries, int ackWaitingTime) {
		
		this(gatewayAddress,gatewayPort,maxMqttsMsgLength,minMqttsMsgLength,
				maxRetries,ackWaitingTime,false);

	}
	
	public MqttsClient(String gatewayAddress, int gatewayPort) {
		this(gatewayAddress,gatewayPort,
				256, 		//max mqtts message length
				2,			//min mqtts message length
				2,			//max number retries
				5,			//ack waiting time
				false);		//auto reconnect
	}
	
	public MqttsClient(String gatewayAddress, int gatewayPort, boolean auto) {
		this(gatewayAddress,gatewayPort,
				256, 		//max mqtts message length
				2,			//min mqtts message length
				2,			//max number retries
				5,			//ack waiting time
				auto);		//auto reconnect
	}
	
	
	
	/**
	 * Registers the callback handler of the application.
	 * This handler is used to notify the app about the completion of async events.
	 */
	public void registerHandler(MqttsCallback handler) {
		this.callback = handler;
		this.clState = ClientState.WAITING_CONNECT;
	}

	public boolean connect(String clientid, boolean cleanstart, int keepalive, 
			String willtopic, int willQoS, String willmsg, boolean willretain) {
	
		if (this.clState != ClientState.WAITING_CONNECT) {
			ClientLogger.log(ClientLogger.WARN, "connect() ignored!");
			//System.out.println("mqttsClient>> connect ignored");
			callback.disconnected(MqttsCallback.MQTTS_ERR_STACK_NOT_READY);
			return false;
		}
	
		this.clientid = clientid;
		this.cleanstart = cleanstart;
		this.keepalive = keepalive;
		this.will = true;
		this.willtopic = willtopic;
		this.willQoS = willQoS;
		this.willmsg = willmsg;
		this.willretain = willretain;
	
		clState=ClientState.CONNECTING_TO_GW;
		mqtts_connecting(this.clientid, this.will, this.cleanstart, this.keepalive);
		return true;
	}

	public boolean connect(String clientid, boolean cleanstart, short keepalive){
	
		if (this.clState != ClientState.WAITING_CONNECT) {
			ClientLogger.log(ClientLogger.WARN, "connect() ignored!");
			callback.disconnected(MqttsCallback.MQTTS_ERR_STACK_NOT_READY);
			return false;
		}
	
		this.clientid = clientid;
		this.cleanstart = cleanstart;
		this.keepalive = keepalive;
		this.will = false;
	
		this.clState= ClientState.CONNECTING_TO_GW;
		mqtts_connecting(this.clientid, this.will, this.cleanstart, this.keepalive);
		return true;
	}

	public boolean disconnect(){
	
		MqttsDisconnect msg = new MqttsDisconnect();
	
		switch (this.clState) {
		case NOT_ACTIVE:
		case WAITING_CONNECT:
			ClientLogger.log(ClientLogger.WARN,"already disconnected, disconnect() ignored");
			break;
	
		case CONNECTING_TO_GW:
		case SEARCHING_GW:
			// no need for sending a DISC since we are not connected 
			this.clState = ClientState.WAITING_CONNECT;
			timer.unregisterAll();
			callback.disconnected(MqttsCallback.MQTTS_OK);
			break;				
	
		case READY:	  		
		case WAITING_ACK:
			// send DISC to gw and wait for DISC
			mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
			timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
			timer.unregister(ControlMessage.KEEP_ALIVE);
			this.clState = ClientState.DISCONNECTING;
			ClientLogger.log(ClientLogger.INFO,"DISCONNECT sent: "+ Utils.hexString(msg.toBytes()));
			udpInterface.sendMsg(msg);
			
			break;		
	
		default:
			break;
		}  	
		return true;
	}

	public boolean publish(int topicId, byte[] message, int qos, boolean retain){
		return publish(0, topicId, message, qos, retain);
	}
	public boolean publish(int topicIdType, int topicId, byte[] message, int qos, boolean retain){
	
		if (this.clState != ClientState.READY) {
			ClientLogger.log(ClientLogger.WARN, "client not ready, publish() ignored! "+"Client state = "+ this.clState);
			return false;
		}
	
		MqttsPublish msg = new MqttsPublish();
	
		switch (qos)   {
		case 2:
			msg.setQos(qos);
			int id = getNewMsgId();
			msg.setMsgId(id);
			break;
	
		case 1:
			msg.setQos(qos);
			id = getNewMsgId();
			msg.setMsgId(id);
			break;
	
		default:
			msg.setQos(qos);
		msg.setMsgId(0);
		break;
		}
	
		msg.setRetain(retain);
		msg.setTopicId(topicId);
		msg.setTopicIdType(topicIdType);
		msg.setData(message);
	
	
		if(qos !=0){  
			/* backup the message for the ack */
			mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
	
			/* timers */
			timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
			timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
			clState= ClientState.WAITING_ACK;
		}
		/* Send the message */
		ClientLogger.log(ClientLogger.INFO, "Send PUBLISH to gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
		return true;
	}

	public boolean subscribe(String topicName, int qos, int topicIdType){
		if (this.clState != ClientState.READY) {
			ClientLogger.log(ClientLogger.WARN, "client not ready, subscribe() ignored!");
			return false;
		}
	
		MqttsSubscribe msg = new MqttsSubscribe();
		msg.setQos(qos);
		if (topicIdType == MqttsMessage.TOPIC_NAME){
			msg.setTopicIdType(MqttsMessage.TOPIC_NAME);
			msg.setTopicName(topicName);
		}			
		if (topicIdType == MqttsMessage.SHORT_TOPIC_NAME){
			msg.setTopicIdType(MqttsMessage.SHORT_TOPIC_NAME);
			msg.setShortTopicName(topicName);
		}
	
		int id = getNewMsgId();
		msg.setMsgId(id);
	
		/* backup the message for the ack */
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
	
		/* timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
	
		clState= ClientState.WAITING_ACK;
		/* Send the message */
		ClientLogger.log(ClientLogger.INFO, "Send SUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
		return true;
	}

	public boolean subscribe(int topicId, int qos){
		if (this.clState != ClientState.READY) {
			ClientLogger.log(ClientLogger.WARN, "client not ready, subscribe2() ignored!");
			return false;
		}
	
		MqttsSubscribe msg = new MqttsSubscribe();
		msg.setQos(qos);
		msg.setTopicIdType(MqttsMessage.PREDIFINED_TOPIC_ID);
		msg.setPredefinedTopicId(topicId);
		int id = getNewMsgId();
		msg.setMsgId(id);
	
		/* backup the message for the ack */
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
	
		/* timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
	
		clState= ClientState.WAITING_ACK;
		/* Send the message */
		ClientLogger.log(ClientLogger.INFO, "Send SUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
		return true;
	}

	public boolean unSubscribe(String topicName, int topicIdType){
		if (this.clState != ClientState.READY) {
			ClientLogger.log(ClientLogger.WARN, "client not ready, unsubscribe1() ignored!");
			return false;
		}
	
		MqttsUnsubscribe msg= new MqttsUnsubscribe();
	
		if (topicIdType == MqttsMessage.TOPIC_NAME){
			msg.setTopicIdType(MqttsMessage.TOPIC_NAME);
			msg.setTopicName(topicName);
		}			
		if (topicIdType == MqttsMessage.SHORT_TOPIC_NAME){
			msg.setTopicIdType(MqttsMessage.SHORT_TOPIC_NAME);
			msg.setShortTopicName(topicName);
		}
	
		int id = getNewMsgId();
		msg.setMsgId(id);
	
		/* backup the message for the ack */
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
	
		/* timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
	
		clState= ClientState.WAITING_ACK;	
		/* Send the message */
		ClientLogger.log(ClientLogger.INFO, "Send UNSUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
		return true;
	}

	public boolean unSubscribe(int topicId){
		if (this.clState != ClientState.READY) {
			ClientLogger.log(ClientLogger.WARN, "client not ready, subscribe2() ignored!");
			return false;
		}
	
		MqttsUnsubscribe msg= new MqttsUnsubscribe();
	
		msg.setTopicIdType(MqttsMessage.PREDIFINED_TOPIC_ID);
		msg.setPredefinedTopicId(topicId);
		int id = getNewMsgId();
		msg.setMsgId(id);
		/* backup the message for the ack */
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
	
		/* timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
	
		clState= ClientState.WAITING_ACK;
		/* Send the message */
		ClientLogger.log(ClientLogger.INFO, "Send UNSUBSCRIBE to gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
		return true;
	}

	public boolean register(String topicName){
		if (this.clState != ClientState.READY) {
			ClientLogger.log(ClientLogger.WARN, "client not ready, register() ignored! "+this.clState);
			return false;
		}
	
		MqttsRegister msg = new MqttsRegister();
		msg.setTopicId(0);
		int msgId = getNewMsgId();
		msg.setMsgId(msgId);
		msg.setTopicName(topicName);
	
		/* backup the message for the ack */
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
	
		/* timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
	
		/* we waiting for REGACK */
		clState= ClientState.WAITING_ACK;
		/* Send the message */
		ClientLogger.log(ClientLogger.INFO, "Send REGISTER to gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
		return true;
	}

	public void terminate(){
		// terminate udp reader
		ClientLogger.log(ClientLogger.INFO, "Closing UDP ...");
		this.udpInterface.terminate();
		// unregister all timers
		ClientLogger.log(ClientLogger.INFO, "Stopping all timers ...");
		this.timer.terminate();
		// stop mqtts client
		ClientLogger.log(ClientLogger.INFO, "Stopping client and closing queue ...");
		this.running = false;
		this.queue.close();
		// wait until thread is terminated.
		try {
			this.readThread.join();
			ClientLogger.log(ClientLogger.INFO, "Client terminated ...");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public ClientParameters getClientParameters() {
		return clientParms;
	}

	public void setLogfile(String filename) {
		try {
			ClientLogger.setLogFile(filename);
		} catch (MqttsException e) {
			System.err.println("MqttsClient: cannot set log filename");
			e.printStackTrace();
		}
	}

	public void setLogLevel(int level) {
		ClientLogger.setLogLevel(level);
	}

	public void setWaitingTime(int t) {
		clientParms.setWaitingTime(t);
	}
	
	public int getLocalUDPPort() {
		return udpInterface.getUdpPort();
	}

	/******************************************************************************************/
	/**                      HANDLING OF MQTTS MESSAGES                                     **/
	/****************************************************************************************/

	private void handleMqttsMessage(MqttsMessage receivedMsg){	
		
		if (clState == ClientState.NOT_ACTIVE) {
			ClientLogger.log(ClientLogger.WARN, "Client not started yet, received msg ingored."); 
			return;
		}
		
		//get the type of the Mqtts message and handle the message according to that type	
		switch(receivedMsg.getMsgType()){
		case MqttsMessage.ADVERTISE:
			//handleMqttsAdvertise((MqttsAdvertise) receivedMsg);
			break;

		case MqttsMessage.SEARCHGW:
			//handleMqttsSearchGW((MqttsSearchGW) receivedMsg);
			break;

		case MqttsMessage.GWINFO:
			//handleMqttsGWInfo((MqttsGWInfo) receivedMsg);
			break;				

		case MqttsMessage.CONNECT:
			//we will never receive such a message from the gateway
			ClientLogger.log(ClientLogger.WARN, "CONNECT received and ignored!!!");
			break;

		case MqttsMessage.CONNACK:
			handleMqttsConnack((MqttsConnack) receivedMsg);
			break;

		case MqttsMessage.WILLTOPICREQ:
			handleMqttsWillTopicReq((MqttsWillTopicReq) receivedMsg);
			break;

		case MqttsMessage.WILLTOPIC:
			ClientLogger.log(ClientLogger.WARN, "WILLTOPIC received and ignored!!!");
			break;

		case MqttsMessage.WILLMSGREQ:
			handleMqttsWillMsgReq((MqttsWillMsgReq) receivedMsg);
			break;

		case MqttsMessage.WILLMSG:
			ClientLogger.log(ClientLogger.WARN, "WILLMSG received and ignored!!!");
			break;

		case MqttsMessage.REGISTER:
			handleMqttsRegister((MqttsRegister)receivedMsg);
			break;

		case MqttsMessage.REGACK:
			handleMqttsRegack((MqttsRegack) receivedMsg);
			break;

		case MqttsMessage.PUBLISH:
			handleMqttsPublish((MqttsPublish) receivedMsg);
			break;

		case MqttsMessage.PUBACK:
			handleMqttsPuback((MqttsPuback) receivedMsg);
			break;

		case MqttsMessage.PUBCOMP:
			handleMqttsPubComp((MqttsPubComp) receivedMsg);
			break;

		case MqttsMessage.PUBREC:
			handleMqttsPubRec((MqttsPubRec) receivedMsg);
			break;

		case MqttsMessage.PUBREL:
			handleMqttsPubRel((MqttsPubRel) receivedMsg);
			break;

		case MqttsMessage.SUBSCRIBE:
			ClientLogger.log(ClientLogger.WARN, "SUBSCRIBE received and ignored!!!");
			break;

		case MqttsMessage.SUBACK:
			handleMqttsSuback((MqttsSuback) receivedMsg);
			break;

		case MqttsMessage.UNSUBSCRIBE:
			ClientLogger.log(ClientLogger.WARN, "UNSUBSCRIBE and ignored received !!!");
			break;

		case MqttsMessage.UNSUBACK:
			handleMqttsUnsuback((MqttsUnsuback) receivedMsg);
			break;

		case MqttsMessage.PINGREQ:
			handleMqttsPingReq((MqttsPingReq) receivedMsg);
			break;

		case MqttsMessage.PINGRESP:
			handleMqttsPingResp((MqttsPingResp) receivedMsg);
			break;			

		case MqttsMessage.DISCONNECT:
			handleMqttsDisconnect((MqttsDisconnect) receivedMsg);
			break;

		case MqttsMessage.WILLTOPICUPD:
			ClientLogger.log(ClientLogger.WARN, "WILLTOPICUPD received and ignored!!!");
			break;

		case MqttsMessage.WILLTOPICRESP:
			handleMqttsWillTopicResp((MqttsWillTopicResp) receivedMsg);
			break;

		case MqttsMessage.WILLMSGUPD:
			ClientLogger.log(ClientLogger.WARN, "WILLMSGUPD received and ignored!!!");
			break;

		case MqttsMessage.WILLMSGRESP:
			handleMqttsWillMsgResp((MqttsWillMsgResp) receivedMsg);
			break;

		default:
			ClientLogger.log(ClientLogger.WARN, "MQTT-S message of unknown type \"" 
					+ receivedMsg.getMsgType()+"\" received and ignored!!!");
		break;
		}
	}


	private void handleMqttsWillTopicResp(MqttsWillTopicResp receivedMsg) {
		// TODO Auto-generated method stub

	}

	private void handleMqttsWillMsgResp(MqttsWillMsgResp receivedMsg) {
		// TODO Auto-generated method stub

	}
	
	private void ack_rx() {
		timer.unregister(ControlMessage.ACK);
		clState= ClientState.READY;
		if (lostGw) {
			callback.connected();
			lostGw = false;
		}
	}
	
	

	private void handleMqttsDisconnect(MqttsDisconnect receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "DISCONNECT received: " + 
				Utils.hexString(receivedMsg.toBytes()));

		timer.unregister(ControlMessage.ACK);
		timer.unregister(ControlMessage.KEEP_ALIVE);
		
		//hlt 19.03.2009 Because we cannot distinguish 
		//between a new gateway => client data still valid at broker
		//or a broker restart => client data deleted
		//it is better to inform app so that app can do a restart
		//e.g. reissue register and subscriptions
		clState = ClientState.WAITING_CONNECT;
		ClientLogger.log(ClientLogger.INFO, "Disconnected, waiting for connect");
		callback.disconnected(MqttsCallback.MQTTS_OK);
	}



	private void handleMqttsPingResp(MqttsPingResp receivedMsg) {
		ClientLogger.log(ClientLogger.INFO,"PINGRESP received: " +
				Utils.hexString(receivedMsg.toBytes()));

		switch (this.clState){
		case WAITING_ACK:
			ack_rx();
			break;
		default:
			break;
		}  		 	
	}



	private void handleMqttsPingReq(MqttsPingReq receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "PINGREQ received: " +
				Utils.hexString(receivedMsg.toBytes()));
		mqtts_pingresp();		
	}



	private void handleMqttsUnsuback(MqttsUnsuback receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "UNSUBACK received: " + 
				Utils.hexString(receivedMsg.toBytes()));

		switch (this.clState){
		case WAITING_ACK:
			if(receivedMsg.getMsgId() != ((MqttsUnsubscribe)this.message).getMsgId()){
				ClientLogger.log(ClientLogger.WARN, "MsgId (\""+
						receivedMsg.getMsgId()+"\") of the received Mqtts UNSUBACK message does not match the MsgId (\""+
						((MqttsUnsubscribe)this.message).getMsgId()+
						"\") of the stored Mqtts UNSUBSCRIBE message. The message cannot be processed.");
				return;
			}	
			
			ack_rx();
			callback.unsubackReceived();
			break;

		default:
			break;
		}  		
	}

	private void handleMqttsSuback(MqttsSuback receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "SUBACK received: " +
				Utils.hexString(receivedMsg.toBytes()));

		switch (this.clState){
		case WAITING_ACK:
			if(receivedMsg.getMsgId() != ((MqttsSubscribe)this.message).getMsgId()){
				ClientLogger.log(ClientLogger.WARN, "MsgId (\""+receivedMsg.getMsgId()+"\") of the received Mqtts SUBACK message does not match the MsgId (\""+((MqttsSubscribe)this.message).getMsgId()+"\") of the stored Mqtts SUBSCRIBE message. The message cannot be processed.");
				return;
			}		  		
			
			ack_rx();
			callback.subackReceived(receivedMsg.getGrantedQoS(), receivedMsg.getTopicId(),receivedMsg.getReturnCode());

			break;

		default:
			break;
		}  
	}

	private void handleMqttsPubRel(MqttsPubRel receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "PUBREL received: "+
				Utils.hexString(receivedMsg.toBytes()));
		// TODO procedure for QoS 2 not yet checked

	}

	private void handleMqttsPubRec(MqttsPubRec receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "PUBREC received: "+
				Utils.hexString(receivedMsg.toBytes()));
		
		// TODO procedure for QoS 2 not yet checked

		switch (this.clState){
		case WAITING_ACK:
			timer.unregister(ControlMessage.ACK);
			mqtts_pubrel(receivedMsg.getMsgId()); 
			break;	
		default:			   
			break;
		}  		
	}

	private void handleMqttsPubComp(MqttsPubComp receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "PUBCOMP received: "+
				Utils.hexString(receivedMsg.toBytes()));
		
		// TODO procedure for QoS 2 not yet checked

		switch (this.clState){		  		
		case WAITING_ACK:
			ack_rx();
			callback.pubCompReceived();
			break;
		default:			   
			break;
		}  		

	}



	private void handleMqttsPuback(MqttsPuback receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "PUBACK received: "+
				Utils.hexString(receivedMsg.toBytes()));

		switch (this.clState){
		case WAITING_ACK:
			if(receivedMsg.getMsgId() != ((MqttsPublish)this.message).getMsgId()){
				ClientLogger.log(ClientLogger.WARN, "MsgId (\""+receivedMsg.getMsgId()+"\") of the received Mqtts PUBACK message does not match the MsgId (\""+((MqttsPublish)this.message).getMsgId()+"\") of the stored Mqtts PUBLISH message. The message cannot be processed.");
				return;
			}			    
			
			ack_rx();
			callback.pubAckReceived(receivedMsg.getTopicId(), receivedMsg.getReturnCode());
			
			break;

		default:
			if (receivedMsg.getReturnCode() != MqttsMessage.RETURN_CODE_ACCEPTED){
				callback.pubAckReceived(receivedMsg.getTopicId(), receivedMsg.getReturnCode());
			}
		break;
		}
	}

	private void handleMqttsPublish(MqttsPublish receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "PUBLISH received: "+
				Utils.hexString(receivedMsg.toBytes()));
		
		int returnCode=-1;
		if (receivedMsg.getTopicIdType() == MqttsMessage.PREDIFINED_TOPIC_ID) {
			if (callback instanceof MqttsCallbackPreDefinedTopicId) {
				MqttsCallbackPreDefinedTopicId ecb = (MqttsCallbackPreDefinedTopicId)callback;
				returnCode = ecb.publishArrivedPreDefinedTopicId(receivedMsg.isRetain(), receivedMsg.getQos(),
						receivedMsg.getTopicId(),receivedMsg.getData());
			} else {
				ClientLogger.log(ClientLogger.ERROR, "Unexpected publish with predefined topicId received!");
				returnCode = MqttsMessage.RETURN_CODE_INVALID_TOPIC_ID;
			}
			
		} else {
			returnCode = callback.publishArrived(receivedMsg.isRetain(), receivedMsg.getQos(),
					receivedMsg.getTopicId(),receivedMsg.getData());
		}

		if (receivedMsg.getQos() == 1 || returnCode == MqttsMessage.RETURN_CODE_INVALID_TOPIC_ID) {
			mqtts_puback (receivedMsg.getTopicId(),receivedMsg.getMsgId(),returnCode);			
		}
		// TODO procedure for QoS 2 not yet checked
	}



	private void handleMqttsRegack(MqttsRegack receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "REGACK received: "+
				Utils.hexString(receivedMsg.toBytes()));

		switch (this.clState){
		case WAITING_ACK:
			
			if (!(this.message instanceof MqttsRegister)) {
				ClientLogger.log(ClientLogger.ERROR, "Unexpected message received: " + this.message.getMsgType());
				break;
			}
			
			if(receivedMsg.getMsgId() != ((MqttsRegister)this.message).getMsgId()){
				ClientLogger.log(ClientLogger.WARN, "MsgId (\""+receivedMsg.getMsgId()+"\") of the received Mqtts REGACK message does not match the MsgId (\""+((MqttsRegister)this.message).getMsgId()+"\") of the stored Mqtts REGISTER message. The message cannot be processed.");
				return;
			}		    
			
			ack_rx();
			callback.regAckReceived(receivedMsg.getTopicId(), receivedMsg.getReturnCode());
			break;
		default:
			break;
		}			
	}


	private void handleMqttsRegister(MqttsRegister receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "REGISTER received: " +
				Utils.hexString(receivedMsg.toBytes()));

		mqtts_regack(receivedMsg.getTopicId(), receivedMsg.getMsgId(), MqttsMessage.RETURN_CODE_ACCEPTED);
		callback.registerReceived(receivedMsg.getTopicId(), receivedMsg.getTopicName());
	}


	private void handleMqttsWillMsgReq(MqttsWillMsgReq receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "WILLMSGREQ received "+
				Utils.hexString(receivedMsg.toBytes()));
		timer.unregister(ControlMessage.ACK);
		mqtts_willmsg();
	}

	private void handleMqttsWillTopicReq(MqttsWillTopicReq receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "WILLTOPCREQ received: "+
				Utils.hexString(receivedMsg.toBytes()));
		timer.unregister(ControlMessage.ACK);
		mqtts_willtopic();
	}



	private void handleMqttsConnack(MqttsConnack receivedMsg) {
		ClientLogger.log(ClientLogger.INFO, "CONNACK received: " + 
				Utils.hexString(receivedMsg.toBytes()));

		switch (this.clState){
		case CONNECTING_TO_GW:
			if (receivedMsg.getReturnCode() == MqttsMessage.RETURN_CODE_ACCEPTED) {
				timer.unregister(ControlMessage.ACK);
				clState= ClientState.READY; 
				callback.connected();
				lostGw = false;
			} else {
				clState = ClientState.WAITING_CONNECT;
				timer.unregister(ControlMessage.ACK);
				timer.unregister(ControlMessage.KEEP_ALIVE);
				callback.disconnected(receivedMsg.getReturnCode());
			}
			break;	
		default:
			ClientLogger.log(ClientLogger.WARN,"CONNACK received in state "+ this.clState);
			break;
		}		
	}


	private int getNewMsgId(){
		return msgId ++;
	}



	/**
    send a PUBACK message
	 */

	private void mqtts_puback(int topicId, int msgId, int returnCode) {	
		//construct a Mqtts PUBACK message
		MqttsPuback puback = new MqttsPuback();

		puback.setTopicId(topicId);
		puback.setMsgId(msgId);
		puback.setReturnCode(returnCode);

		// re-start keep alive timer and send the message
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());

		//send the Mqtts PUBACK message to the gateway	
		ClientLogger.log(ClientLogger.INFO, "Sending PUBACK message with \"TopicId\" = \"" +topicId+"\" to the gateway:"+ Utils.hexString(puback.toBytes()));
		udpInterface.sendMsg(puback);
	}

	/**
    send a REGACK message

	 */
	private void mqtts_regack(int topicId, int msgId, int returnCode) {

		//construct a Mqtts REGACK message
		MqttsRegack regack = new MqttsRegack();
		regack.setTopicId(topicId);
		regack.setMsgId(msgId);
		regack.setReturnCode(returnCode);

		// re-start keep alive timer and send the message*/
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());

		//send the Mqtts REGACK message to the gateway	
		ClientLogger.log(ClientLogger.INFO, "Sending REGACK message with \"TopicId\" = \"" +topicId+"\" to the gateway :"+ Utils.hexString(regack.toBytes()));
		udpInterface.sendMsg(regack);
	}


	/**
    send a PUBREL message
	 */
	private void mqtts_pubrel(int msgId) {
		//construct a Mqtts PUBREL message
		MqttsPubRel msg = new MqttsPubRel();
		msg.setMsgId(msgId);

		//send the Mqtts PUBREL message
		ClientLogger.log(ClientLogger.INFO, "Sending PUBREL message to the gateway: "+ Utils.hexString(msg.toBytes()));
		
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
		clState= ClientState.WAITING_ACK;

		/* we are waiting for PUBCOMP */
		/* TODO What happens if we do not rx a PUBCOMP ? Retransmit PUBREL? */
		/* backup the message for the ack*/
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
		
		udpInterface.sendMsg(msg);
	}  



	/**
    send an PINGREQ message
	 */
	private void mqtts_pingreq() {
		//construct a Mqtts PINGREQ message
		MqttsPingReq msg = new MqttsPingReq();

		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());

		//send the Mqtts PINGREQ message to the client
		ClientLogger.log(ClientLogger.INFO, "Sending Mqtts PINGREQ message to the gateway: "+ Utils.hexString(msg.toBytes()));
		clState= ClientState.WAITING_ACK;
		udpInterface.sendMsg(msg);
	}

	/**
    send an PINGRESP message
	 */
	private void mqtts_pingresp() {
		//construct a Mqtts PINGREQ message
		MqttsPingResp msg = new MqttsPingResp();

		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());

		//send the Mqtts PINGRESP message to the client
		ClientLogger.log(ClientLogger.INFO, "Sending PINGRESP to the gateway: "+ Utils.hexString(msg.toBytes()));;
		udpInterface.sendMsg(msg);

	}

	/**
    send an WILLMSG message
	 */
	private void mqtts_willmsg() {

		//construct a Mqtts WILLMSG message
		MqttsWillMsg msg = new MqttsWillMsg();
		msg.setWillMsg(this.willmsg);


		/* backup the message for the ack*/
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);

		/* star timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
		ClientLogger.log(ClientLogger.INFO, "Sending WILLMSG to the gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);

	}

	/**
    send an WILLTOPIC message
	 */
	private void mqtts_willtopic() {

		MqttsWillTopic msg = new MqttsWillTopic();
		msg.setQos(this.willQoS);
		msg.setRetain(this.willretain);
		msg.setWillTopic(this.willtopic);

		/* backup the message for the ack*/
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);

		/* start timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
		ClientLogger.log(ClientLogger.INFO, "Sending WILLTOPIC to the gateway: "+ Utils.hexString(msg.toBytes()));
		udpInterface.sendMsg(msg);
	}

	/**
    send CONNECT message
	 */

	private void mqtts_connecting(String clientid , boolean will, boolean cleanstart, int keepalive) {
		MqttsConnect msg = new MqttsConnect();
		msg.setCleanSession(cleanstart);
		msg.setClientId(clientid);
		msg.setDuration(keepalive);
		msg.setWill(will);

		/* set the value of the keep_alive timer */
		clientParms.setKeepAlivePeriod(keepalive);
		
		/* backup the message for the ack */
		mqtts_backup(MQTTS_BACKUP_MESSAGE, msg);

		/* start ack and keep_alive timers */
		timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
		timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());

		udpInterface.sendMsg(msg);
		ClientLogger.log(ClientLogger.INFO, "CONNECT sent to gateway: "+ Utils.hexString(msg.toBytes()));
		
		callback.connectSent();

		return;
	}

	/* (non-Javadoc)
	 * @see java.lang.Runnable#run()
	 */
	 public void run() {
		while (running) {
			readMsg();
		}
	}


	private void readMsg() {
		//read the next available Message from queue

		Message msg = null;
		try {
			msg = (Message)queue.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		if (msg == null) {
			return;
		}
		//get the type of the message that "internal" message carries
		int type = msg.getType();
		switch(type){		
		case Message.MQTTS_MSG:
			ClientLogger.log(ClientLogger.INFO, "Processing an mqtts message ...");
			handleMqttsMessage(msg.getMqttsMessage());
			break;

		case Message.CONTROL_MSG:
			ClientLogger.log(ClientLogger.INFO, "Processing a control message ...");
			handleControlMessage(msg.getControlMessage());
			break;

		default:
			ClientLogger.log(ClientLogger.WARN,"Internal message of unknown type \"" + msg.getType()+"\" received.");
		break;
		}

	}


	private void handleControlMessage(ControlMessage controlMessage) {
		switch (controlMessage.getMsgType()) {
		case ControlMessage.ACK:
			ClientLogger.log(ClientLogger.INFO, "ACK timeout");
			ackMissedCounter++;
			
			if (ackMissedCounter > clientParms.getMaxRetries()) {
				
				//We log a warnning and inform app
				ClientLogger.log(ClientLogger.WARN, "Too many ACKs missed, lost gw ...");
				callback.disconnected(MqttsCallback.MQTTS_LOST_GATEWAY);
				
				if (autoReconnect) {
					mqtts_backup(MQTTS_BACKUP_SEND_MESSAGE, null);
					ackMissedCounter= 0;
					ClientLogger.log(ClientLogger.WARN, "will try re-connecting ...");
				} else {
					timer.unregister(ControlMessage.ACK);
					timer.unregister(ControlMessage.KEEP_ALIVE);
					clState = ClientState.WAITING_CONNECT;
					ClientLogger.log(ClientLogger.WARN, "Waiting for new connect from application ...");
				}
				
			} else mqtts_backup(MQTTS_BACKUP_SEND_MESSAGE, null);
			break;

		case ControlMessage.KEEP_ALIVE:
			ClientLogger.log(ClientLogger.INFO, "Keep Alive timeout, send PINGREQ");   	  
			mqtts_pingreq();
			break;

		default:
			break;
		}		
	}

	private void mqtts_backup(int action, MqttsMessage msg) {
		switch (action) {
		case MQTTS_BACKUP_MESSAGE: /* back up message */		
			this.message = msg;
			ClientLogger.log(ClientLogger.INFO, "Message backup for retransmission");
			break;

		case MQTTS_BACKUP_SEND_MESSAGE: /* resend message stored in backup */		
			timer.register(ControlMessage.KEEP_ALIVE, clientParms.getKeepAlivePeriod());
			timer.register(ControlMessage.ACK, clientParms.getWaitingTime());
			ClientLogger.log(ClientLogger.INFO, "Backup message resent:"+ Utils.hexString(this.message.toBytes()));
			udpInterface.sendMsg(this.message);
			
			break;

		default:
			break;
		}
		return;
	}




	/********************************************************************************/
	private enum ClientState {
		
		NOT_ACTIVE,
		WAITING_CONNECT,
		CONNECTING_TO_GW,
		READY,

		WAITING_ACK,
		
		SEARCHING_GW,
		DISCONNECTING;

	}
	
}